RabbitMQ远程过程调用RPC (java)

您所在的位置:网站首页 rabbitmq rpc远程调用 RabbitMQ远程过程调用RPC (java)

RabbitMQ远程过程调用RPC (java)

2023-08-25 23:48| 来源: 网络整理| 查看: 265

RPC,远程过程调用,主要功能是让构建分布式更容易,在提供强大的远程调用能力不足是不损失本地调用的语义简洁性。

通俗来讲,假设有俩台服务器A,B,一个应用部署在甲服务器上,一个应用部署在乙服务器上,A服务器应用想要调用b服务器上的资源(函数或方法),由于不在同一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

流程 

*客户端启动时,会创建一个匿名的独占回调队列。 

*对于RPC请求,客户端发送带有两个属性的消息:replyTo(设置为回调队列)和correlationId(设置为每个请求的唯一值)。  *请求被发送到rpc_queue队列。  * RPC服务器监听rpc_queue队列中请求。当出现请求时,它会执行该作业,接收队列就是replyTo设定的回调队列。  *客户端监听回调队列。出现消息时,它会检查correlationId属性。如果它与请求中的值匹配,则返回对应用程序的响应。  (客户端,我有一堆东西,你处理一下,的correlationID,我的请求标识,回复处理完结果返回这个中队列。  服务端的:处理完了,返回,的的correlationID,客户端拿它的ID和服务端返回的ID对比,然后进行接收)

案例如下:

服务端代码:

 

/** * @Author liuzengli * @Date 2018/11/29 20:45 * @Description * @Param * @Return **/ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; //RPC调用服务端 public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { //• 先建立连接、通道,并声明队列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); factory.setUsername(""); factory.setPassword(""); factory.setPort(AMQP.PROTOCOL.PORT); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); //•可以运行多个服务器进程。通过channel.basicQos设置prefetchCount属性可将负载平均分配到多台服务器上。 channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); //打开应答机制autoAck=false channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { //等待客户端回应 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //客户端执行 consumer.nextDelivery() 响应之后进入 BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder() .correlationId(props.getCorrelationId()).build(); //获取客户端发送的主体 String message = new String(delivery.getBody()); //调用服务端方法解析之后返回给客户端 System.out.println(" [.] getMd5String(" + message + ")"); String response = getMd5String(message); //返回处理结果队列 channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes()); //发送应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } // 模拟RPC方法 获取MD5字符串 public static String getMd5String(String str) { MessageDigest md5 = null; try { md5 = MessageDigest.getInstance("MD5"); } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); return ""; } char[] charArray = str.toCharArray(); byte[] byteArray = new byte[charArray.length]; for (int i = 0; i < charArray.length; i++) byteArray[i] = (byte) charArray[i]; byte[] md5Bytes = md5.digest(byteArray); StringBuffer hexValue = new StringBuffer(); for (int i = 0; i < md5Bytes.length; i++) { int val = ((int) md5Bytes[i]) & 0xff; if (val < 16) hexValue.append("0"); hexValue.append(Integer.toHexString(val)); } return hexValue.toString(); } }

客户端代码:

 

/** * @Author liuzengli * @Date 2018/11/29 20:45 * @Description * @Param * @Return **/ import java.security.MessageDigest; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; //RPC调用客户端 public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { //• 先建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); factory.setUsername(""); factory.setPassword(""); factory.setPort(AMQP.PROTOCOL.PORT); connection = factory.newConnection(); channel = connection.createChannel(); //• 注册'回调'队列,这样就可以收到RPC响应 replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } //发送RPC请求 public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); //发送请求消息,消息使用了两个属性:replyto和correlationId BasicProperties props = new BasicProperties.Builder() .correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); //等待接收结果 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //检查它的correlationId是否是我们所要找的那个 if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] args) throws Exception { RPCClient rpcClient = new RPCClient(); System.out.println(" [x] Requesting getMd5String(abc)"); String response = rpcClient.call("abc"); System.out.println(" [.] Got '" + response + "'"); rpcClient.close(); } }

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3